{ NULL },
};
+typedef struct {
+ enum {
+ PULL_MSG_SCAN_IDLE,
+ PULL_MSG_MAIN_IDLE,
+ PULL_MSG_FETCH,
+ PULL_MSG_SCAN,
+ PULL_MSG_QUIT
+ } t;
+ union {
+ guint idle_serial;
+ GVariant *item;
+ } d;
+} PullWorkerMessage;
+
typedef struct {
OstreeRepo *repo;
char *remote_name;
GMainLoop *loop;
GCancellable *cancellable;
- gboolean metadata_scan_active;
volatile gint n_scanned_metadata;
- volatile gint n_requested_metadata;
- volatile gint n_requested_content;
- guint n_fetched_metadata;
guint outstanding_uri_requests;
- OtWorkerQueue *metadata_objects_to_scan;
- GHashTable *scanned_metadata; /* Maps object name to itself */
- GHashTable *requested_content; /* Maps object name to itself */
- guint n_outstanding_metadata_fetches;
-
- guint n_fetched_content;
- guint outstanding_filecontent_requests;
- guint outstanding_content_stage_requests;
+ GThread *metadata_thread;
+ GMainContext *metadata_thread_context;
+ GMainLoop *metadata_thread_loop;
+ OtWaitableQueue *metadata_objects_to_scan;
+ OtWaitableQueue *metadata_objects_to_fetch;
+ GHashTable *scanned_metadata; /* Maps object name to itself */
+ GHashTable *requested_metadata; /* Maps object name to itself */
+ GHashTable *requested_content; /* Maps object name to itself */
+ guint metadata_scan_idle : 1; /* TRUE if we passed through an idle message */
+ guint idle_serial; /* Incremented when we get a SCAN_IDLE message */
+ guint n_outstanding_metadata_fetches;
+ guint n_outstanding_metadata_stage_requests;
+ guint n_outstanding_content_fetches;
+ guint n_outstanding_content_stage_requests;
+ gint n_requested_metadata;
+ gint n_requested_content;
+ guint n_fetched_metadata;
+ guint n_fetched_content;
gboolean have_previous_bytes;
guint64 previous_bytes_sec;
} OtPullData;
typedef struct {
- OtPullData *pull_data;
-
- gboolean fetching_content;
-
- GFile *meta_path;
- GFile *content_path;
-
- char *checksum;
-} OtFetchOneContentItemData;
+ OtPullData *pull_data;
+ GVariant *object;
+ GFile *temp_path;
+} FetchObjectData;
static SoupURI *
suburi_new (SoupURI *base,
uri_fetch_update_status (gpointer user_data)
{
OtPullData *pull_data = user_data;
- ot_lfree char *fetcher_status;
+ ot_lfree char *fetcher_status = NULL;
GString *status;
guint64 current_bytes_transferred;
guint64 current_delta_bytes_transferred;
guint64 delta_bytes_transferred;
+ guint outstanding_stages;
+ guint outstanding_fetches;
status = g_string_new ("");
- if (pull_data->metadata_scan_active)
+ if (!pull_data->metadata_scan_idle)
g_string_append_printf (status, "scan: %u metadata; ",
g_atomic_int_get (&pull_data->n_scanned_metadata));
- g_string_append_printf (status, "fetch: %u/%u metadata %u/%u content; ",
- g_atomic_int_get (&pull_data->n_fetched_metadata),
- g_atomic_int_get (&pull_data->n_requested_metadata),
- pull_data->n_fetched_content,
- g_atomic_int_get (&pull_data->n_requested_content));
-
- current_bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher);
- current_delta_bytes_transferred = current_bytes_transferred - pull_data->previous_total_downloaded;
+ outstanding_stages = pull_data->n_outstanding_content_stage_requests + pull_data->n_outstanding_metadata_stage_requests;
+ if (outstanding_stages > 0)
+ g_string_append_printf (status, "writing: %u objects; ", outstanding_stages);
- if (pull_data->have_previous_bytes)
- delta_bytes_transferred = (guint64)(0.5 * current_delta_bytes_transferred + 0.5 * pull_data->previous_bytes_sec);
- else
+ outstanding_fetches = pull_data->n_outstanding_content_fetches + pull_data->n_outstanding_metadata_fetches;
+ if (outstanding_fetches)
{
- pull_data->have_previous_bytes = TRUE;
- delta_bytes_transferred = current_delta_bytes_transferred;
- }
- pull_data->previous_bytes_sec = delta_bytes_transferred;
- pull_data->previous_total_downloaded = current_bytes_transferred;
+ g_string_append_printf (status, "fetch: %u/%u metadata %u/%u content; ",
+ pull_data->n_fetched_metadata,
+ pull_data->n_requested_metadata,
+ pull_data->n_fetched_content,
+ pull_data->n_requested_content);
- if (delta_bytes_transferred < 1024)
- g_string_append_printf (status, "%u B/s; ",
- (guint)delta_bytes_transferred);
- else
- g_string_append_printf (status, "%.1f KiB/s; ",
- (double)delta_bytes_transferred / 1024);
+ current_bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher);
+ current_delta_bytes_transferred = current_bytes_transferred - pull_data->previous_total_downloaded;
- fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher);
- g_string_append (status, fetcher_status);
+ if (pull_data->have_previous_bytes)
+ delta_bytes_transferred = (guint64)(0.5 * current_delta_bytes_transferred + 0.5 * pull_data->previous_bytes_sec);
+ else
+ {
+ pull_data->have_previous_bytes = TRUE;
+ delta_bytes_transferred = current_delta_bytes_transferred;
+ }
+ pull_data->previous_bytes_sec = delta_bytes_transferred;
+ pull_data->previous_total_downloaded = current_bytes_transferred;
+
+ if (delta_bytes_transferred < 1024)
+ g_string_append_printf (status, "%u B/s; ",
+ (guint)delta_bytes_transferred);
+ else
+ g_string_append_printf (status, "%.1f KiB/s; ",
+ (double)delta_bytes_transferred / 1024);
+
+ fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher);
+ g_string_append (status, fetcher_status);
+ }
gs_console_begin_status_line (gs_console_get (), status->str, NULL, NULL);
return TRUE;
}
+static PullWorkerMessage *
+pull_worker_message_new (int msgtype, gpointer data)
+{
+ PullWorkerMessage *msg = g_new (PullWorkerMessage, 1);
+ msg->t = msgtype;
+ switch (msgtype)
+ {
+ case PULL_MSG_SCAN_IDLE:
+ case PULL_MSG_MAIN_IDLE:
+ msg->d.idle_serial = GPOINTER_TO_UINT (data);
+ break;
+ case PULL_MSG_SCAN:
+ case PULL_MSG_FETCH:
+ msg->d.item = data;
+ break;
+ case PULL_MSG_QUIT:
+ break;
+ }
+ return msg;
+}
+
static void
throw_async_error (OtPullData *pull_data,
GError *error)
check_outstanding_requests_handle_error (OtPullData *pull_data,
GError *error)
{
- if ((!pull_data->metadata_objects_to_scan || ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan)) &&
- pull_data->outstanding_uri_requests == 0 &&
- pull_data->outstanding_filecontent_requests == 0 &&
- pull_data->n_outstanding_metadata_fetches == 0 &&
- pull_data->outstanding_content_stage_requests == 0)
- g_main_loop_quit (pull_data->loop);
+ gboolean current_fetch_idle = (pull_data->n_outstanding_metadata_fetches == 0 &&
+ pull_data->n_outstanding_content_fetches == 0);
+ gboolean current_stage_idle = (pull_data->n_outstanding_metadata_stage_requests == 0 &&
+ pull_data->n_outstanding_content_stage_requests == 0);
+
+ g_debug ("pull: scan: %u fetching: %u staging: %u",
+ !pull_data->metadata_scan_idle, !current_fetch_idle, !current_stage_idle);
+
+ /* This is true in the phase when we're fetching refs */
+ if (pull_data->metadata_objects_to_scan == NULL)
+ {
+ if (pull_data->outstanding_uri_requests == 0)
+ g_main_loop_quit (pull_data->loop);
+ return;
+ }
+ else if (pull_data->metadata_scan_idle && current_fetch_idle && current_stage_idle)
+ {
+ g_main_loop_quit (pull_data->loop);
+ }
+
throw_async_error (pull_data, error);
}
g_source_unref (update_timeout);
}
+ g_idle_add (idle_check_outstanding_requests, pull_data);
g_main_loop_run (pull_data->loop);
if (console)
fetch_data.pull_data = pull_data;
uri_string = soup_uri_to_string (uri, FALSE);
- g_print ("Fetching %s\n", uri_string);
pull_data->outstanding_uri_requests++;
ostree_fetcher_request_uri_async (pull_data->fetcher, uri, cancellable,
return ret;
}
-static gboolean
-idle_queue_content_request (gpointer user_data);
-
static gboolean
scan_dirtree_object (OtPullData *pull_data,
const char *checksum,
{
const char *filename;
gboolean file_is_stored;
- OtFetchOneContentItemData *idle_fetch_data;
ot_lvariant GVariant *csum = NULL;
ot_lfree char *file_checksum;
if (!ostree_repo_has_object (pull_data->repo, OSTREE_OBJECT_TYPE_FILE, file_checksum,
&file_is_stored, cancellable, error))
goto out;
-
+
if (!file_is_stored && !g_hash_table_lookup (pull_data->requested_content, file_checksum))
{
- char *duped_checksum;
-
- idle_fetch_data = g_new0 (OtFetchOneContentItemData, 1);
- idle_fetch_data->pull_data = pull_data;
- idle_fetch_data->checksum = file_checksum;
- file_checksum = NULL; /* Transfer ownership */
-
- duped_checksum = g_strdup (idle_fetch_data->checksum);
- g_hash_table_insert (pull_data->requested_content, duped_checksum, duped_checksum);
-
- g_atomic_int_inc (&pull_data->n_requested_content);
- g_main_context_invoke (NULL, idle_queue_content_request, idle_fetch_data);
+ g_hash_table_insert (pull_data->requested_content, file_checksum, file_checksum);
+
+ ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
+ pull_worker_message_new (PULL_MSG_FETCH,
+ ostree_object_name_serialize (file_checksum, OSTREE_OBJECT_TYPE_FILE)));
+ file_checksum = NULL; /* Transfer ownership to hash */
}
}
return ret;
}
-static void
-destroy_fetch_one_content_item_data (OtFetchOneContentItemData *data)
-{
- if (data->meta_path)
- (void) gs_file_unlink (data->meta_path, NULL, NULL);
- g_clear_object (&data->meta_path);
- if (data->content_path)
- (void) gs_file_unlink (data->content_path, NULL, NULL);
- g_clear_object (&data->content_path);
- g_free (data->checksum);
- g_free (data);
-}
-
static void
content_fetch_on_stage_complete (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
- OtFetchOneContentItemData *data = user_data;
+ FetchObjectData *fetch_data = user_data;
+ OtPullData *pull_data = fetch_data->pull_data;
GError *local_error = NULL;
GError **error = &local_error;
+ OstreeObjectType objtype;
+ const char *expected_checksum;
ot_lfree guchar *csum = NULL;
ot_lfree char *checksum = NULL;
checksum = ostree_checksum_from_bytes (csum);
- g_assert (strcmp (checksum, data->checksum) == 0);
+ ostree_object_name_deserialize (fetch_data->object, &expected_checksum, &objtype);
+ g_assert (objtype == OSTREE_OBJECT_TYPE_FILE);
- data->pull_data->n_fetched_content++;
- out:
- data->pull_data->outstanding_content_stage_requests--;
- check_outstanding_requests_handle_error (data->pull_data, local_error);
- destroy_fetch_one_content_item_data (data);
-}
+ g_debug ("stage of %s complete", ostree_object_to_string (checksum, objtype));
-static void
-content_fetch_on_complete (GObject *object,
- GAsyncResult *result,
- gpointer user_data);
+ g_assert (strcmp (checksum, expected_checksum) == 0);
+ pull_data->n_fetched_content++;
+ out:
+ pull_data->n_outstanding_content_stage_requests--;
+ check_outstanding_requests_handle_error (pull_data, local_error);
+ (void) gs_file_unlink (fetch_data->temp_path, NULL, NULL);
+ g_object_unref (fetch_data->temp_path);
+ g_variant_unref (fetch_data->object);
+ g_free (fetch_data);
+}
static void
content_fetch_on_complete (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
- OtFetchOneContentItemData *data = user_data;
+ FetchObjectData *fetch_data = user_data;
+ OtPullData *pull_data = fetch_data->pull_data;
GError *local_error = NULL;
GError **error = &local_error;
GCancellable *cancellable = NULL;
ot_lvariant GVariant *xattrs = NULL;
ot_lobj GInputStream *file_in = NULL;
ot_lobj GInputStream *object_input = NULL;
+ const char *checksum;
+ OstreeObjectType objtype;
- data->content_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error);
- if (!data->content_path)
+ fetch_data->temp_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error);
+ if (!fetch_data->temp_path)
goto out;
- g_assert (data->content_path != NULL);
- if (!ostree_content_file_parse (TRUE, data->content_path, FALSE,
+ ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype);
+ g_assert (objtype == OSTREE_OBJECT_TYPE_FILE);
+
+ g_debug ("fetch of %s complete", ostree_object_to_string (checksum, objtype));
+
+ if (!ostree_content_file_parse (TRUE, fetch_data->temp_path, FALSE,
&file_in, &file_info, &xattrs,
cancellable, error))
goto out;
cancellable, error))
goto out;
- data->pull_data->outstanding_content_stage_requests++;
- ostree_repo_stage_content_async (data->pull_data->repo, data->checksum,
+ pull_data->n_outstanding_content_stage_requests++;
+ ostree_repo_stage_content_async (pull_data->repo, checksum,
object_input, length,
cancellable,
- content_fetch_on_stage_complete, data);
+ content_fetch_on_stage_complete, fetch_data);
out:
- data->pull_data->outstanding_filecontent_requests--;
- check_outstanding_requests_handle_error (data->pull_data, local_error);
+ pull_data->n_outstanding_content_fetches--;
+ check_outstanding_requests_handle_error (pull_data, local_error);
}
-static gboolean
-idle_queue_content_request (gpointer user_data)
-{
- OtFetchOneContentItemData *data = user_data;
- OtPullData *pull_data = data->pull_data;
- const char *checksum = data->checksum;
- ot_lfree char *objpath = NULL;
- SoupURI *obj_uri = NULL;
-
- objpath = ostree_get_relative_object_path (checksum, OSTREE_OBJECT_TYPE_FILE, TRUE);
- obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
-
- ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
- content_fetch_on_complete, data);
- soup_uri_free (obj_uri);
-
- pull_data->outstanding_filecontent_requests++;
-
- return FALSE;
-}
-
-typedef struct {
- OtPullData *pull_data;
- GVariant *object;
- GFile *temp_path;
-} IdleFetchMetadataObjectData;
-
static void
on_metadata_staged (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
- IdleFetchMetadataObjectData *fetch_data = user_data;
+ FetchObjectData *fetch_data = user_data;
OtPullData *pull_data = fetch_data->pull_data;
+ GError *local_error = NULL;
+ GError **error = &local_error;
+ const char *expected_checksum;
+ OstreeObjectType objtype;
+ gs_free char *checksum = NULL;
+ gs_free guchar *csum = NULL;
- pull_data->n_fetched_metadata++;
- pull_data->n_outstanding_metadata_fetches--;
+ if (!ostree_repo_stage_metadata_finish ((OstreeRepo*)object, result,
+ &csum, error))
+ goto out;
- ot_worker_queue_push (pull_data->metadata_objects_to_scan,
- g_variant_ref (fetch_data->object));
+ checksum = ostree_checksum_from_bytes (csum);
+
+ ostree_object_name_deserialize (fetch_data->object, &expected_checksum, &objtype);
+ g_assert (OSTREE_OBJECT_TYPE_IS_META (objtype));
+
+ g_debug ("stage of %s complete", ostree_object_to_string (checksum, objtype));
+ g_assert (strcmp (checksum, expected_checksum) == 0);
+
+ pull_data->metadata_scan_idle = FALSE;
+ ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
+ pull_worker_message_new (PULL_MSG_SCAN,
+ g_variant_ref (fetch_data->object)));
+ out:
+ pull_data->n_outstanding_metadata_stage_requests--;
(void) gs_file_unlink (fetch_data->temp_path, NULL, NULL);
g_object_unref (fetch_data->temp_path);
g_variant_unref (fetch_data->object);
g_free (fetch_data);
+
+ check_outstanding_requests_handle_error (pull_data, local_error);
}
static void
GAsyncResult *result,
gpointer user_data)
{
- IdleFetchMetadataObjectData *fetch_data = user_data;
+ FetchObjectData *fetch_data = user_data;
OtPullData *pull_data = fetch_data->pull_data;
ot_lvariant GVariant *metadata = NULL;
const char *checksum;
ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype);
+ g_debug ("fetch of %s complete", ostree_object_to_string (checksum, objtype));
+
if (!ot_util_variant_map (fetch_data->temp_path, ostree_metadata_variant_type (objtype),
FALSE, &metadata, error))
goto out;
pull_data->cancellable,
on_metadata_staged, fetch_data);
+ pull_data->n_outstanding_metadata_stage_requests++;
out:
+ pull_data->n_outstanding_metadata_fetches--;
+ pull_data->n_fetched_metadata++;
throw_async_error (pull_data, local_error);
if (local_error)
{
}
}
-static gboolean
-idle_fetch_metadata_object (gpointer data)
-{
- IdleFetchMetadataObjectData *fetch_data = data;
- OtPullData *pull_data = fetch_data->pull_data;
- ot_lfree char *objpath = NULL;
- const char *checksum;
- OstreeObjectType objtype;
- SoupURI *obj_uri = NULL;
-
- ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype);
-
- objpath = ostree_get_relative_object_path (checksum, objtype, TRUE);
- obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
-
- pull_data->n_outstanding_metadata_fetches++;
- ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
- meta_fetch_on_complete, fetch_data);
- soup_uri_free (obj_uri);
-
- return FALSE;
-}
-
-/**
- * queue_metadata_object_fetch:
- *
- * Pass a request to the main thread to fetch a metadata object.
- */
-static void
-queue_metadata_object_fetch (OtPullData *pull_data,
- GVariant *object)
-{
- IdleFetchMetadataObjectData *fetch_data = g_new (IdleFetchMetadataObjectData, 1);
- fetch_data->pull_data = pull_data;
- fetch_data->object = g_variant_ref (object);
- g_idle_add (idle_fetch_metadata_object, fetch_data);
-}
-
static gboolean
scan_commit_object (OtPullData *pull_data,
const char *checksum,
gboolean ret = FALSE;
ot_lvariant GVariant *object = NULL;
ot_lfree char *tmp_checksum = NULL;
+ gboolean is_requested;
gboolean is_stored;
tmp_checksum = ostree_checksum_from_bytes (csum);
if (g_hash_table_lookup (pull_data->scanned_metadata, object))
return TRUE;
+ is_requested = g_hash_table_lookup (pull_data->requested_metadata, tmp_checksum) != NULL;
if (!ostree_repo_has_object (pull_data->repo, objtype, tmp_checksum, &is_stored,
cancellable, error))
goto out;
-
- if (!is_stored)
+
+ if (!is_stored && !is_requested)
{
- g_atomic_int_inc (&pull_data->n_requested_metadata);
- queue_metadata_object_fetch (pull_data, object);
+ char *duped_checksum = g_strdup (tmp_checksum);
+ g_hash_table_insert (pull_data->requested_metadata, duped_checksum, duped_checksum);
+
+ ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
+ pull_worker_message_new (PULL_MSG_FETCH,
+ g_variant_ref (object)));
}
- else
+ else if (is_stored)
{
switch (objtype)
{
}
g_hash_table_insert (pull_data->scanned_metadata, g_variant_ref (object), object);
g_atomic_int_inc (&pull_data->n_scanned_metadata);
-
- g_idle_add (idle_check_outstanding_requests, pull_data);
}
ret = TRUE;
return FALSE;
}
-/**
- * scan_one_metadata_object_dispatch:
- *
- * Called from the metadatascan worker thread. If we're missing an
- * object from one of them, we queue a request to the main thread to
- * fetch it. When it's fetched, we get passed the object back and
- * scan it.
- */
-static void
-scan_one_metadata_object_dispatch (gpointer item,
- gpointer user_data)
+static gboolean
+on_metadata_objects_to_scan_ready (gint fd,
+ GIOCondition condition,
+ gpointer user_data)
{
OtPullData *pull_data = user_data;
+ PullWorkerMessage *msg;
+ PullWorkerMessage *last_idle_msg = NULL;
GError *local_error = NULL;
GError **error = &local_error;
- ot_lvariant GVariant *v_item = NULL;
-
- v_item = item;
- if (!scan_one_metadata_object_v_name (pull_data, v_item,
- pull_data->cancellable, error))
- goto out;
+ while (ot_waitable_queue_pop (pull_data->metadata_objects_to_scan, (gpointer*)&msg))
+ {
+ if (msg->t == PULL_MSG_SCAN)
+ {
+ if (!scan_one_metadata_object_v_name (pull_data, msg->d.item,
+ pull_data->cancellable, error))
+ goto out;
+ g_variant_unref (msg->d.item);
+ g_free (msg);
+ }
+ else if (msg->t == PULL_MSG_MAIN_IDLE)
+ {
+ g_free (last_idle_msg);
+ last_idle_msg = msg;
+ }
+ else if (msg->t == PULL_MSG_QUIT)
+ {
+ g_free (msg);
+ g_main_loop_quit (pull_data->metadata_thread_loop);
+ }
+ else
+ g_assert_not_reached ();
+ }
+
+ if (last_idle_msg)
+ ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
+ last_idle_msg);
+
+ /* When we have no queue to process, notify the main thread */
+ ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
+ pull_worker_message_new (PULL_MSG_SCAN_IDLE, GUINT_TO_POINTER (0)));
out:
if (local_error)
throwdata->error = local_error;
g_main_context_invoke (NULL, idle_throw_error, throwdata);
}
+ return TRUE;
}
+/**
+ * metadata_thread_main:
+ *
+ * Called from the metadatascan worker thread. If we're missing an
+ * object from one of them, we queue a request to the main thread to
+ * fetch it. When it's fetched, we get passed the object back and
+ * scan it.
+ */
+static gpointer
+metadata_thread_main (gpointer user_data)
+{
+ OtPullData *pull_data = user_data;
+ GSource *src;
+
+ pull_data->metadata_thread_context = g_main_context_new ();
+ pull_data->metadata_thread_loop = g_main_loop_new (pull_data->metadata_thread_context, TRUE);
+
+ src = ot_waitable_queue_create_source (pull_data->metadata_objects_to_scan);
+ g_source_set_callback (src, (GSourceFunc)on_metadata_objects_to_scan_ready, pull_data, NULL);
+ g_source_attach (src, pull_data->metadata_thread_context);
+ g_source_unref (src);
+
+ g_main_loop_run (pull_data->metadata_thread_loop);
+ return NULL;
+}
static gboolean
-idle_start_worker (gpointer user_data)
+on_metadata_objects_to_fetch_ready (gint fd,
+ GIOCondition condition,
+ gpointer user_data)
{
OtPullData *pull_data = user_data;
+ PullWorkerMessage *msg;
+
+ if (!ot_waitable_queue_pop (pull_data->metadata_objects_to_fetch, (gpointer*)&msg))
+ goto out;
- ot_worker_queue_start (pull_data->metadata_objects_to_scan);
+ if (msg->t == PULL_MSG_MAIN_IDLE)
+ {
+ if (msg->d.idle_serial == pull_data->idle_serial)
+ {
+ g_assert (!pull_data->metadata_scan_idle);
+ pull_data->metadata_scan_idle = TRUE;
+ g_debug ("pull: metadata scan is idle");
+ }
+ }
+ else if (msg->t == PULL_MSG_SCAN_IDLE)
+ {
+ if (!pull_data->metadata_scan_idle)
+ {
+ g_debug ("pull: queue MAIN_IDLE");
+ pull_data->idle_serial++;
+ ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
+ pull_worker_message_new (PULL_MSG_MAIN_IDLE, GUINT_TO_POINTER (pull_data->idle_serial)));
+ }
+ }
+ else if (msg->t == PULL_MSG_FETCH)
+ {
+ const char *checksum;
+ gs_free char *objpath = NULL;
+ OstreeObjectType objtype;
+ SoupURI *obj_uri = NULL;
+ gboolean is_meta;
+ FetchObjectData *fetch_data;
+
+ ostree_object_name_deserialize (msg->d.item, &checksum, &objtype);
+ objpath = ostree_get_relative_object_path (checksum, objtype, TRUE);
+ obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
- return FALSE;
+ is_meta = OSTREE_OBJECT_TYPE_IS_META (objtype);
+ if (is_meta)
+ {
+ pull_data->n_outstanding_metadata_fetches++;
+ pull_data->n_requested_metadata++;
+ }
+ else
+ {
+ pull_data->n_outstanding_content_fetches++;
+ pull_data->n_requested_content++;
+ }
+ fetch_data = g_new (FetchObjectData, 1);
+ fetch_data->pull_data = pull_data;
+ fetch_data->object = g_variant_ref (msg->d.item);
+ ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
+ is_meta ? meta_fetch_on_complete : content_fetch_on_complete, fetch_data);
+ soup_uri_free (obj_uri);
+ g_variant_unref (msg->d.item);
+ }
+ else
+ {
+ g_assert_not_reached ();
+ }
+ g_free (msg);
+
+ out:
+ check_outstanding_requests_handle_error (pull_data, NULL);
+
+ return TRUE;
}
static gboolean
(GDestroyNotify)g_variant_unref, NULL);
pull_data->requested_content = g_hash_table_new_full (g_str_hash, g_str_equal,
(GDestroyNotify)g_free, NULL);
+ pull_data->requested_metadata = g_hash_table_new_full (g_str_hash, g_str_equal,
+ (GDestroyNotify)g_free, NULL);
if (argc < 2)
{
if (!ostree_repo_prepare_transaction (pull_data->repo, FALSE, NULL, error))
goto out;
- pull_data->metadata_scan_active = TRUE;
-
- pull_data->metadata_objects_to_scan = ot_worker_queue_new ("metadatascan",
- scan_one_metadata_object_dispatch,
- pull_data);
+ pull_data->metadata_objects_to_fetch = ot_waitable_queue_new ();
+ pull_data->metadata_objects_to_scan = ot_waitable_queue_new ();
+ pull_data->metadata_thread = g_thread_new ("metadatascan", metadata_thread_main, pull_data);
g_hash_table_iter_init (&hash_iter, commits_to_fetch);
while (g_hash_table_iter_next (&hash_iter, &key, &value))
{
const char *commit = value;
- ot_worker_queue_push (pull_data->metadata_objects_to_scan,
- ostree_object_name_serialize (commit, OSTREE_OBJECT_TYPE_COMMIT));
+ ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
+ pull_worker_message_new (PULL_MSG_SCAN,
+ ostree_object_name_serialize (commit, OSTREE_OBJECT_TYPE_COMMIT)));
}
g_hash_table_iter_init (&hash_iter, requested_refs_to_fetch);
}
else
{
- ot_worker_queue_push (pull_data->metadata_objects_to_scan,
- ostree_object_name_serialize (sha256, OSTREE_OBJECT_TYPE_COMMIT));
+ ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
+ pull_worker_message_new (PULL_MSG_SCAN,
+ ostree_object_name_serialize (sha256, OSTREE_OBJECT_TYPE_COMMIT)));
g_hash_table_insert (updated_refs, g_strdup (ref), g_strdup (sha256));
}
}
- /* Start metadata thread, which kicks off further metadata requests
- * as well as content fetches.
- */
- if (!ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan))
- {
- g_idle_add (idle_start_worker, pull_data);
-
- /* Now await work completion */
- if (!run_mainloop_monitor_fetcher (pull_data))
- goto out;
- }
+ {
+ GSource *src = ot_waitable_queue_create_source (pull_data->metadata_objects_to_fetch);
+ g_source_set_callback (src, (GSourceFunc)on_metadata_objects_to_fetch_ready, pull_data, NULL);
+ g_source_attach (src, NULL);
+ g_source_unref (src);
+ }
+
+ /* Prime the message queue */
+ pull_data->idle_serial++;
+ ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
+ pull_worker_message_new (PULL_MSG_MAIN_IDLE, GUINT_TO_POINTER (pull_data->idle_serial)));
+
+ /* Now await work completion */
+ if (!run_mainloop_monitor_fetcher (pull_data))
+ goto out;
if (!ostree_repo_commit_transaction (pull_data->repo, cancellable, error))
goto out;
g_free (pull_data->remote_name);
if (pull_data->base_uri)
soup_uri_free (pull_data->base_uri);
- g_clear_pointer (&pull_data->metadata_objects_to_scan, (GDestroyNotify) ot_worker_queue_unref);
+ if (pull_data->metadata_thread)
+ {
+ ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
+ pull_worker_message_new (PULL_MSG_QUIT, NULL));
+ g_thread_join (pull_data->metadata_thread);
+ }
+ g_clear_pointer (&pull_data->metadata_objects_to_scan, (GDestroyNotify) ot_waitable_queue_unref);
+ g_clear_pointer (&pull_data->metadata_objects_to_fetch, (GDestroyNotify) ot_waitable_queue_unref);
g_clear_pointer (&pull_data->scanned_metadata, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->requested_content, (GDestroyNotify) g_hash_table_unref);
+ g_clear_pointer (&pull_data->requested_metadata, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&remote_config, (GDestroyNotify) g_key_file_unref);
if (summary_uri)
soup_uri_free (summary_uri);